/**
*
* @brief	服务器连接状态管理服务
* @author	yun
* @date		2016-12-21
* @desc     创建与销毁服务器连接线程，处理设备与服务器数据连接
* @version                                          
*		
* 
**/

#include	"service_server.h"

// platform
#include	"platform_logger.h"

// rtos
#include	"cmsis_os.h"

// network
#include	"mxchipWNET.h"

// app
#include	"app_conf.h"
#include	"app_message.h"

// 服务器通知线程
static void thread_server(void const *argument);
static void thread_server_notification(void const *argument);
static int server_connect(char *domain);
static int server_message_receive(int fd);
static int server_post_handler(char * post_str);

static int server_notification_establish(int *fd);
static int server_notification_destroy(void);

// Var
osThreadId		server_id = NULL;
osThreadId		server_notification_id = NULL;

//QueueHandle_t	message_send_id = NULL;
osSemaphoreId	send_sempahore_id = NULL;
osMessageQId	send_queue_id = NULL;

// 数据发送缓冲区
char packet_send[APP_SERVER_PACKET_LEN_MAX] = { 0 };


typedef struct {
	char 	service[32];		// 服务名称
	server_post_callback_fn		callback_fn;   // 回调函数
}server_post_t;

// 消息订阅数组
server_post_t	server_post_arr[APP_SERVICE_CNT_MAX];

//
// server_post_subcribe: 服务器推送消息订阅
// service: 需要订阅的消息内类型
// post_callback: 服务器消息接收函数
// 成功订阅返回 (0), 错误返回 (-1)
//
int server_post_subscribe(char * service, server_post_callback_fn post_callback){
	if(service == NULL || post_callback == NULL){
		return -1;
	}
	// 查找空位，或者原先占用的位置
	int subscribe_index =  APP_SERVICE_CNT_MAX;
	for(int i=0; i<APP_SERVICE_CNT_MAX; i++){
		if(strcmp(service, server_post_arr[i].service) == 0){
			subscribe_index = i;
			break;
		}
		if(server_post_arr[i].callback_fn == NULL){
			 subscribe_index = i;
		}
	}
	// 没有空位置
	if(subscribe_index == APP_SERVICE_CNT_MAX){
		return -1;
	}
	strcpy(server_post_arr[subscribe_index].service, service);
	server_post_arr[subscribe_index].callback_fn = post_callback;
	return 0;
}

//
// 服务连接初始化
//
int app_server_init(void){
	for(int i=0; i<APP_SERVICE_CNT_MAX; i++){
		memset(server_post_arr[i].service, 0x00, sizeof(server_post_arr[i].service));
		server_post_arr[i].callback_fn = NULL;
	}
	return 0;
}

// 服务器连接建立
int app_server_establish(void){
	osThreadDef(Server, thread_server, osPriorityNormal, 0, 1024);
	server_id = osThreadCreate(osThread(Server), NULL);	
	if(server_id == NULL){
		return -1;
	}
	return 0;
}

// 服务器连接销毁
int app_server_destroy(void){
	if (server_id != NULL){
		osThreadTerminate(server_id);
	}
	
	if(server_notification_id != NULL){
		osThreadTerminate(server_notification_id);
	}
	return 0;
}

// 服务器连接与数据接收
static void thread_server(void const * argument){
	log_notice_printf("thread: server");
	
	int connected = 0;
	int fd = -1;
	while(1) {
		// 服务器断开
		while(connected == 0){
			log_notice_printf("app connecting to %s", APP_SERVER_DOMAIN);
			fd = server_connect(APP_SERVER_DOMAIN);
			if(fd != -1){
				connected = 1;
			} else {
				osDelay(2000);
			}	
		}
		// Notification
		app_state_notification(APP_SERVER_CON);
		// 服务器连接
		while(connected == 1){
			log_debug_printf("server fd = %d", fd);
            if(server_notification_establish(&fd) == -1){
				log_error_printf("thread notification create error\n");
				app_state_notification(APP_ERR);
			}
			// 接收服务器消息
			server_message_receive(fd);
			// 服务器断开后处理
			server_notification_destroy();
			close(fd);
			fd = -1;
			connected = 0;
		}
		//Notification
		app_state_notification(APP_SERVER_DISCON);
	}

}

//
// 接收服务器发送回来的消息，直至连接关闭退出
// fd: 监听socket fd
// 
static int server_message_receive(int fd){
	fd_set read_fds;
	fd_set err_fds;
	
	struct timeval_t timeout = {0, 0};
	char packet[APP_SERVER_PACKET_LEN_MAX];
	int interval = 0;
	while(1){
		FD_ZERO(&read_fds);
		FD_ZERO(&err_fds);
		FD_SET(fd, &read_fds);
		FD_SET(fd, &err_fds);
		osDelay(20);
		switch(select(fd+1, &read_fds, NULL, &err_fds, &timeout)){
			case -1:
				log_error_printf("server: select error\n");
				return -1;
			case 0:
				interval += 20;
				break;
			default:
				if(FD_ISSET(fd, &read_fds)){
					interval = 0;
					int len = recv(fd, packet, APP_SERVER_PACKET_LEN_MAX, 0);
					if (len > 0){
						packet[len] = '\0';
						log_info_printf("packet: %s", packet); 
						server_post_handler(packet);
					} else {
						return -1;
					}
				}
				break;
		}
		if(interval > APP_SERVER_TIMEOUT){
			log_info_printf("server: timeout");
			return -1;
		}
		timeout.tv_sec =  0;
		timeout.tv_usec = 0;
	}
}


//
// post_str: 服务器下发的字符串
//
static int server_post_handler(char * post_str){
	 message_post_t	message_post;
	if(message_post_parse(&message_post, post_str) == 0) {
	    for(int i=0; i<APP_SERVICE_CNT_MAX; i++){
			if(strcmp(message_post.type, server_post_arr[i].service) == 0){
				server_post_arr[i].callback_fn(message_post.type, message_post.data);
				break;
			}
		}
	} else {
		log_info_printf("server: post error");
	}
	// 释放解析消息所分配的内存空间
	message_post_release(&message_post);
	return 0;
}

//
// 连接服务器
// domain: 服务器域名格式 host:port 默认端口号80
//
static int server_connect(char *domain){
	int opt;
	int fd = socket(AF_INET, SOCK_STREAM, 0);
	
	struct timeval_t timeout;
	timeout.tv_sec = 5;
    timeout.tv_usec = 0;
    setsockopt(fd,0,SO_CONTIMEO,&timeout,sizeof(struct timeval_t));

	setsockopt(fd, 0, SO_BLOCKMODE, &opt, 4);
	if (fd == -1){
		log_error_printf("server: create socket fail");
		close(fd);
		return -1;
	}
	//Port
	char domain_t[64] = { 0 };
	strcpy(domain_t, domain);
	int port = 80;
	char *pos = strstr(domain_t, ":");
	if (pos) {
		sscanf(pos, ":%d", &port);
		pos[0] = '\0';
	}
	
	//addr
	struct sockaddr_t addr;
	addr.s_ip = inet_addr(domain_t);
	addr.s_port = port;
	if(connect(fd, &addr, sizeof(addr))){
		log_debug_printf("server: connect fail");
		close(fd);
		fd = -1;
	}
	return fd;
}

//
// 服务器通知线程
// argument: socket fd
//
static void thread_server_notification(void const *argument){
	int fd = *(int *)argument;
	log_debug_printf("thread: server notification fd = %d", fd);
	
	osEvent queue_event;
	
	while(1){
		queue_event = osMessageGet(send_queue_id, 3000);
		if (queue_event.status == osEventMessage){
			send(fd, packet_send, strlen(packet_send), 0);
			log_info_printf("packet: %s", packet_send);
		} 
		osSemaphoreRelease(send_sempahore_id);
	}
}


// 
// 创建服务器监听服务
// fd: 监听socket_fd
// 
static int server_notification_establish(int *fd){
	osThreadDef(Notification, thread_server_notification, osPriorityNormal, 0, 256);
	server_notification_id =  osThreadCreate(osThread(Notification), fd);
	if(server_notification_id == NULL) {
		return -1;
	}
	
	osSemaphoreDef(sevNotification);
	send_sempahore_id = osSemaphoreCreate(osSemaphore(sevNotification), 1);
	if(send_sempahore_id == NULL){
		return -1;
	}
	
	osMessageQDef(sevNotification, 1, sizeof(char *));
	send_queue_id = osMessageCreate(osMessageQ(sevNotification), NULL);
	if(send_queue_id == NULL){
		return -1;
	}
	return 0;
}

//
// 销毁服务器数据监听服务
//
static int server_notification_destroy(void){
	if(server_notification_id != NULL){
		osThreadTerminate(server_notification_id);
		server_notification_id = NULL;
	}
	
	if(send_sempahore_id != NULL){
		osSemaphoreDelete(send_sempahore_id);
		send_sempahore_id = NULL;
	}
	
	if(send_queue_id != NULL){
		 vQueueDelete(send_queue_id);
		send_queue_id = NULL;
	}
	
	// 等待线程成功销毁
	osDelay(100);
	
	return 0;
}

//
// 发送数据
// service: 服务名称
// data: 业务数据
//
int server_send_data(char * service, char *data){
	if(send_sempahore_id == NULL){
		return -1;
	}
	
	// 获取信号量
	if(osSemaphoreWait(send_sempahore_id, 2000) != osOK) {
		return -1;
	}
	
	message_send_t message_send;
	message_send.type = service;
	message_send.date = data;
	if(message_encode(packet_send, &message_send) == -1) {
		 return -1;
	}	
	
	// 发送消息
	if(osMessagePut(send_queue_id, (unsigned int)packet_send, 2000) != osOK){
		return -1;
	}
	
	return 0;
}


